"""Anonymous telemetry for Reflex."""

from __future__ import annotations

import asyncio
import dataclasses
import multiprocessing
import platform
import warnings
from contextlib import suppress
from datetime import datetime, timezone

import httpx
import psutil

from reflex import constants
from reflex.config import environment
from reflex.utils import console
from reflex.utils.prerequisites import ensure_reflex_installation_id, get_project_hash

UTC = timezone.utc
POSTHOG_API_URL: str = "https://app.posthog.com/capture/"


def get_os() -> str:
    """Get the operating system.

    Returns:
        The operating system.
    """
    return platform.system()


def get_detailed_platform_str() -> str:
    """Get the detailed os/platform string.

    Returns:
        The platform string
    """
    return platform.platform()


def get_python_version() -> str:
    """Get the Python version.

    Returns:
        The Python version.
    """
    # Remove the "+" from the version string in case user is using a pre-release version.
    return platform.python_version().rstrip("+")


def get_reflex_version() -> str:
    """Get the Reflex version.

    Returns:
        The Reflex version.
    """
    return constants.Reflex.VERSION


def get_cpu_count() -> int:
    """Get the number of CPUs.

    Returns:
        The number of CPUs.
    """
    return multiprocessing.cpu_count()


def get_memory() -> int:
    """Get the total memory in MB.

    Returns:
        The total memory in MB.
    """
    try:
        return psutil.virtual_memory().total >> 20
    except ValueError:  # needed to pass ubuntu test
        return 0


def _raise_on_missing_project_hash() -> bool:
    """Check if an error should be raised when project hash is missing.

    When running reflex with --backend-only, or doing database migration
    operations, there is no requirement for a .web directory, so the reflex.json
    file may not exist, and this should not be considered an error.

    Returns:
        False when compilation should be skipped (i.e. no .web directory is required).
        Otherwise return True.
    """
    return not environment.REFLEX_SKIP_COMPILE.get()


def _prepare_event(event: str, **kwargs) -> dict:
    """Prepare the event to be sent to the PostHog server.

    Args:
        event: The event name.
        kwargs: Additional data to send with the event.

    Returns:
        The event data.
    """
    from reflex.utils.prerequisites import get_cpu_info

    installation_id = ensure_reflex_installation_id()
    project_hash = get_project_hash(raise_on_fail=_raise_on_missing_project_hash())

    if installation_id is None or project_hash is None:
        console.debug(
            f"Could not get installation_id or project_hash: {installation_id}, {project_hash}"
        )
        return {}

    stamp = datetime.now(UTC).isoformat()

    cpuinfo = get_cpu_info()

    additional_keys = ["template", "context", "detail", "user_uuid"]
    additional_fields = {
        key: value for key in additional_keys if (value := kwargs.get(key)) is not None
    }
    return {
        "api_key": "phc_JoMo0fOyi0GQAooY3UyO9k0hebGkMyFJrrCw1Gt5SGb",
        "event": event,
        "properties": {
            "distinct_id": installation_id,
            "distinct_app_id": project_hash,
            "user_os": get_os(),
            "user_os_detail": get_detailed_platform_str(),
            "reflex_version": get_reflex_version(),
            "python_version": get_python_version(),
            "cpu_count": get_cpu_count(),
            "memory": get_memory(),
            "cpu_info": dataclasses.asdict(cpuinfo) if cpuinfo else {},
            **additional_fields,
        },
        "timestamp": stamp,
    }


def _send_event(event_data: dict) -> bool:
    try:
        httpx.post(POSTHOG_API_URL, json=event_data)
    except Exception:
        return False
    else:
        return True


def _send(event: str, telemetry_enabled: bool | None, **kwargs):
    from reflex.config import get_config

    # Get the telemetry_enabled from the config if it is not specified.
    if telemetry_enabled is None:
        telemetry_enabled = get_config().telemetry_enabled

    # Return if telemetry is disabled.
    if not telemetry_enabled:
        return False

    with suppress(Exception):
        event_data = _prepare_event(event, **kwargs)
        if not event_data:
            return False
        return _send_event(event_data)


def send(event: str, telemetry_enabled: bool | None = None, **kwargs):
    """Send anonymous telemetry for Reflex.

    Args:
        event: The event name.
        telemetry_enabled: Whether to send the telemetry (If None, get from config).
        kwargs: Additional data to send with the event.
    """

    async def async_send(event: str, telemetry_enabled: bool | None, **kwargs):
        return _send(event, telemetry_enabled, **kwargs)

    try:
        # Within an event loop context, send the event asynchronously.
        asyncio.create_task(async_send(event, telemetry_enabled, **kwargs))
    except RuntimeError:
        # If there is no event loop, send the event synchronously.
        warnings.filterwarnings("ignore", category=RuntimeWarning)
        _send(event, telemetry_enabled, **kwargs)


def send_error(error: Exception, context: str):
    """Send an error event.

    Args:
        error: The error to send.
        context: The context of the error (e.g. "frontend" or "backend")

    Returns:
        Whether the telemetry was sent successfully.
    """
    return send("error", detail=type(error).__name__, context=context)
